<html>
    <head>
        <meta charset="utf-8">
        <title>Celery用户手册 - Tasks</title>
        <link rel="stylesheet" href="../assets/stylesheets/global.css">
        <link rel="stylesheet" href="../assets/stylesheets/words.css">
        <link rel="stylesheet" href="../assets/stylesheets/monokai.css">
        <link rel="stylesheet" href="../assets/stylesheets/table.css">
        <link rel="shortcut icon" href="../assets/images/favicon.ico" type="image/x-icon">
        <link rel="icon" href="../assets/images/favicon.ico" type="image/x-icon">
        <script>
            // 统计代码
            (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
                (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
                m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
                })(window,document,'script','https://www.google-analytics.com/analytics.js','ga');

            ga('create', 'UA-93231524-1', 'auto');
            ga('send', 'pageview');
        </script>
    </head>
    <body>
        <div id="header">
            <a href="../index.html"><div id="logo">JG</div></a>
        </div>
        <div id="container" class="typo">
            <div id="article">
                <h1>Celery用户手册 - Tasks</h1>
                <h4>Posted April 19, 2016</h4>

                <p><code>Tasks</code>是Celery 应用的构建块。事实上Celery应用是由一个或多个Task拼装组成的。</p>

<p>一个Task即是一个对象， Task被创建后可以被所有调用， 它是双重角色， 当Task被调用可以通过Task可以发送消息， 同时当作为一个worker的时候可以接收消息，并消费。</p>

<p>每个Task name 都是唯一的， 这样可以通过这个名字，找到合适的function去执行消费。</p>

<p>当发送一个任务消息在worker确认(acknowledged)前不会消失，一个worker可以提前存储很多消息，如果worker进程崩溃或killed，消息也不会消失， 消息会通过在投递的方式给其他存活worker。</p>

<p>理想的Task函数必须是幂等的，这意味着相同的参数调用多次不会出现不同的结果。但是worker并不知道函数是幂等的， woker默认是提前确认消息， 在执行完成之前这个task永远不会被重复执行。 这个就是上锁(LOCK)意思。这一段和上一段还是有区别的， 这一段强调的是<strong>开始执行之前确认</strong>。</p>

<p>当然确认如果任务是幂等的，你可以设置<code>acks_late</code>选项来控制worker 在函数返回之后去确认消息<code>acknowledge</code>. 请参考: <a href="http://docs.celeryproject.org/en/latest/faq.html#faq-acks-late-vs-retry%20%22%22">Should I use retry or acks_late?</a></p>

<p>在这一章节， 你将学习所有关于任务的定义，以下为目录:</p>
<blockquote class="blockquote-normal">
                <ul><br/><li><a href="#basics">Basics</a></li><br/><li><a href="#names">Names</a></li><br/><li><a href="#context">Context</a></li><br/><li><a href="#logging">Logging</a></li><br/><li><a href="#retrying">Retrying</a></li><br/><li><a href="#listofoptions">List of Options</a></li><br/><li><a href="#states">States</a></li><br/><li><a href="#semipredicates">Semipredicates</a></li><br/><li><a href="#customtaskclasses">Custom task classes</a></li><br/><li><a href="#howitworks">How it works</a></li><br/><li><a href="#tips">Tips and Best Practices</a></li><br/><li><a href="#performance">Performance and Strategies</a></li><br/><li><a href="#example">Example</a></li><br/></ul><br/></blockquote>
<h2>Basics <span id="basics"></span></h2>

<p>你可以很容易的创建任务在任何的可调用函数上使用<code>task()</code>装饰器.</p>
<div class="code-wrapper"><span class="lang-label">Python</span><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">.models</span> <span class="kn">import</span> <span class="n">User</span>

<span class="nd">@app.task</span>
<span class="k">def</span> <span class="nf">create_user</span><span class="p">(</span><span class="n">username</span><span class="p">,</span> <span class="n">password</span><span class="p">):</span>
    <span class="n">User</span><span class="o">.</span><span class="n">objects</span><span class="o">.</span><span class="n">create</span><span class="p">(</span><span class="n">username</span><span class="o">=</span><span class="n">username</span><span class="p">,</span> <span class="n">password</span><span class="o">=</span><span class="n">password</span><span class="p">)</span>
</pre></div>
</div>
<p>可以在装饰器上指定参数， 来设置<code>Task</code></p>
<div class="code-wrapper"><span class="lang-label">Python</span><div class="highlight"><pre><span></span><span class="nd">@app.task</span><span class="p">(</span><span class="n">serializer</span><span class="o">=</span><span class="s1">&#39;json&#39;</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">create_user</span><span class="p">(</span><span class="n">username</span><span class="p">,</span> <span class="n">password</span><span class="p">):</span>
    <span class="n">User</span><span class="o">.</span><span class="n">objects</span><span class="o">.</span><span class="n">create</span><span class="p">(</span><span class="n">username</span><span class="o">=</span><span class="n">username</span><span class="p">,</span> <span class="n">password</span><span class="o">=</span><span class="n">password</span><span class="p">)</span>
</pre></div>
</div>
<p><strong>Multiple decorators</strong></p>

<p>当使用多个装饰器，需要确保任务装饰生效， 把<code>task decorator</code> 写在函数第一个装饰器.</p>
<div class="code-wrapper"><span class="lang-label">Python</span><div class="highlight"><pre><span></span><span class="nd">@app.task</span>
<span class="nd">@decorator2</span>
<span class="nd">@decorator1</span>
<span class="k">def</span> <span class="nf">add</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">y</span><span class="p">):</span>
    <span class="k">return</span> <span class="n">x</span> <span class="o">+</span> <span class="n">y</span>
</pre></div>
</div>
<p><strong>如何引入task装饰器？</strong></p>

<p><code>task decorator</code> 存在于你的Celery应用的实例上， 上一节我们已经讲过如何声明Application和使用它. </p>

<p>如果你使用的是Django 或者仍然适用老的版本， 你可能导入<code>task decorator</code>的方式是下面这样.</p>
<div class="code-wrapper"><span class="lang-label">Python</span><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">celery</span> <span class="kn">import</span> <span class="n">task</span>

<span class="nd">@task</span>
<span class="k">def</span> <span class="nf">add</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">y</span><span class="p">):</span>
    <span class="k">return</span> <span class="n">x</span> <span class="o">+</span> <span class="n">y</span>
</pre></div>
</div>
<h2>Names <span id="names"></span></h2>

<p>每个任务都有一个唯一的名称， 一个任务创建时如果不提供一个自定义的名字， 将会去生成一个任务.</p>
<div class="code-wrapper"><span class="lang-label">Python</span><div class="highlight"><pre><span></span><span class="o">&gt;&gt;&gt;</span> <span class="nd">@app.task</span><span class="p">(</span><span class="n">name</span><span class="o">=</span><span class="s1">&#39;sum-of-two-numbers&#39;</span><span class="p">)</span>
<span class="o">&gt;&gt;&gt;</span> <span class="k">def</span> <span class="nf">add</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">y</span><span class="p">):</span>
<span class="o">...</span>     <span class="k">return</span> <span class="n">x</span> <span class="o">+</span> <span class="n">y</span>

<span class="o">&gt;&gt;&gt;</span> <span class="n">add</span><span class="o">.</span><span class="n">name</span>
<span class="s1">&#39;sum-of-two-numbers&#39;</span>
</pre></div>
</div>
<p>最好的方式是适用模块名称作为一个名称空间，如果一个任务另外一个模块中也有这样的名称如user模块中有<code>add</code>, group 模块中也有<code>add</code>, 那么这样就会冲突. 最好的方式如下:</p>
<div class="code-wrapper"><span class="lang-label">Python</span><div class="highlight"><pre><span></span><span class="o">&gt;&gt;&gt;</span> <span class="nd">@app.task</span><span class="p">(</span><span class="n">name</span><span class="o">=</span><span class="s1">&#39;tasks.add&#39;</span><span class="p">)</span>
<span class="o">&gt;&gt;&gt;</span> <span class="k">def</span> <span class="nf">add</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">y</span><span class="p">):</span>
<span class="o">...</span>     <span class="k">return</span> <span class="n">x</span> <span class="o">+</span> <span class="n">y</span>
</pre></div>
</div>
<p>你可以通过调用task的属性来获取task name.</p>
<div class="code-wrapper"><span class="lang-label">Python</span><div class="highlight"><pre><span></span><span class="o">&gt;&gt;&gt;</span> <span class="n">add</span><span class="o">.</span><span class="n">name</span>
<span class="s1">&#39;tasks.add&#39;</span>
</pre></div>
</div>
<p>如果不指定， 默认也会通过模块名和函数名拼装生成name</p>

<p><code>tasks.py</code>:</p>
<div class="code-wrapper"><span class="lang-label">Python</span><div class="highlight"><pre><span></span><span class="nd">@app.task</span>
<span class="k">def</span> <span class="nf">add</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">y</span><span class="p">):</span>
    <span class="k">return</span> <span class="n">x</span> <span class="o">+</span> <span class="n">y</span>

<span class="o">&gt;&gt;&gt;</span> <span class="kn">from</span> <span class="nn">tasks</span> <span class="kn">import</span> <span class="n">add</span>
<span class="o">&gt;&gt;&gt;</span> <span class="n">add</span><span class="o">.</span><span class="n">name</span>
<span class="s1">&#39;tasks.add&#39;</span>
</pre></div>
</div>
<p><strong>自动命名和相对import</strong></p>

<p>相对import和自动命名不能一起工作， 所以如果适用相对引入你必须精确的设置name.</p>

<p>如果一个客户端(创建消息的时候) 导入这个<code>myapp.tasks</code> 通过<code>.tasks</code>导入，另外一个worker导入模块通过<code>myapp.tasks</code>， 生成的名称不匹配导致worker会抛出<code>NotRegistered</code> 从而不能工作.</p>

<p>Django INSTALLED_APPS的<code>project.myapp</code>风格.</p>
<div class="code-wrapper"><span class="lang-label">Python</span><div class="highlight"><pre><span></span><span class="n">INSTALLED_APPS</span> <span class="o">=</span> <span class="p">[</span><span class="s1">&#39;project.myapp&#39;</span><span class="p">]</span>
</pre></div>
</div>
<p>如果你安装app使用<code>project.myapp</code>, 那么task导入的时候也要通过<code>project.myapp.tasks</code>导入， 所以你要确保总是使用相同的名称导入任务.</p>
<div class="code-wrapper"><span class="lang-label">Python</span><div class="highlight"><pre><span></span><span class="o">&gt;&gt;&gt;</span> <span class="kn">from</span> <span class="nn">project.myapp.tasks</span> <span class="kn">import</span> <span class="n">mytask</span>   <span class="c1"># &lt;&lt; GOOD</span>

<span class="o">&gt;&gt;&gt;</span> <span class="kn">from</span> <span class="nn">myapp.tasks</span> <span class="kn">import</span> <span class="n">mytask</span>    <span class="c1"># &lt;&lt; BAD!!!</span>
</pre></div>
</div>
<p>上面第二个例子将导致任务以不同的方式命名， 进而导致客户端和worker不用的任务名称。</p>
<div class="code-wrapper"><span class="lang-label">Python</span><div class="highlight"><pre><span></span><span class="o">&gt;&gt;&gt;</span> <span class="kn">from</span> <span class="nn">project.myapp.tasks</span> <span class="kn">import</span> <span class="n">mytask</span>
<span class="o">&gt;&gt;&gt;</span> <span class="n">mytask</span><span class="o">.</span><span class="n">name</span>
<span class="s1">&#39;project.myapp.tasks.mytask&#39;</span>

<span class="o">&gt;&gt;&gt;</span> <span class="kn">from</span> <span class="nn">myapp.tasks</span> <span class="kn">import</span> <span class="n">mytask</span>
<span class="o">&gt;&gt;&gt;</span> <span class="n">mytask</span><span class="o">.</span><span class="n">name</span>
<span class="s1">&#39;myapp.tasks.mytask&#39;</span>
</pre></div>
</div>
<p>因而你导入必须一致， 这也是python推荐的方式。</p>

<p>同样的你不应该使用旧风格进行相对引入.</p>
<div class="code-wrapper"><span class="lang-label">Python</span><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">module</span> <span class="kn">import</span> <span class="n">foo</span>   <span class="c1"># BAD!</span>

<span class="kn">from</span> <span class="nn">proj.module</span> <span class="kn">import</span> <span class="n">foo</span>  <span class="c1"># GOOD!</span>
</pre></div>
</div>
<p>以下新的风格相对引入也是可以推荐的</p>
<div class="code-wrapper"><span class="lang-label">Python</span><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">.module</span> <span class="kn">import</span> <span class="n">foo</span>  <span class="c1"># GOOD!</span>
</pre></div>
</div>
<p>如果你的程序已经做了错的引入， 并且你没有时间去重构， 建议通过显式的指定名称去覆盖自动命名.</p>
<div class="code-wrapper"><span class="lang-label">Python</span><div class="highlight"><pre><span></span><span class="nd">@task</span><span class="p">(</span><span class="n">name</span><span class="o">=</span><span class="s1">&#39;proj.tasks.add&#39;</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">add</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">y</span><span class="p">):</span>
    <span class="k">return</span> <span class="n">x</span> <span class="o">+</span> <span class="n">y</span>
</pre></div>
</div>
<h2>Context <span id="context"></span></h2>

<p>执行任务<code>request</code> 包含的信息和状态.</p>

<p><code>request</code>定义了以下属性</p>
<div class="table-wrapper"><table class="ui selectable celled table"><thead>
<tr>
<th>key</th>
<th>value</th>
</tr>
</thead>

<tbody>
<tr>
<td><strong>id:</strong></td>
<td>执行任务的唯一ID</td>
</tr>
<tr>
<td><strong>group:</strong></td>
<td>组id如果属于组</td>
</tr>
<tr>
<td><strong>chord:</strong></td>
<td>The unique id of the chord this task belongs to (if the task is part of the header).</td>
</tr>
<tr>
<td><strong>args:</strong></td>
<td>位置参数</td>
</tr>
<tr>
<td><strong>kwargs:</strong></td>
<td>键值参数</td>
</tr>
<tr>
<td><strong>retries:</strong></td>
<td>重拾次数</td>
</tr>
<tr>
<td><strong>is_eager:</strong></td>
<td>如果不是工人是本地客户端，设置为True</td>
</tr>
<tr>
<td><strong>eta:</strong></td>
<td>预计任务时间</td>
</tr>
<tr>
<td><strong>expires:</strong></td>
<td>任务的过期时</td>
</tr>
<tr>
<td><strong>logfile:</strong></td>
<td>worker 的日志文件, See: <a href="#logging">logging</a></td>
</tr>
<tr>
<td><strong>loglevel:</strong></td>
<td>当前使用的日志级别</td>
</tr>
<tr>
<td><strong>hostname:</strong></td>
<td>worker实例的hostname</td>
</tr>
<tr>
<td><strong>delivery_info:</strong></td>
<td>额外的传递信息</td>
</tr>
<tr>
<td><strong>called_directly:</strong></td>
<td>This flag is set to true if the task was not executed by the worker.</td>
</tr>
<tr>
<td><strong>callbacks:</strong></td>
<td>回调函数</td>
</tr>
<tr>
<td><strong>errback:</strong></td>
<td>异常回调函数</td>
</tr>
<tr>
<td><strong>utc:</strong></td>
<td>如果为True说明调用者启动了utc</td>
</tr>
</tbody>
</table></div>
<p>3.1 新属性
key|value
---|---
<strong>headers:</strong>|映射消息头
<strong>reply_to:</strong>|发送replay到哪个队列
<strong>correlation_id:</strong>|通常与任务id通用， 常用语amqp的跟踪回复</p>

<p>一个从context获取获取信息的例子:</p>
<div class="code-wrapper"><span class="lang-label">Python</span><div class="highlight"><pre><span></span><span class="nd">@app.task</span><span class="p">(</span><span class="n">bind</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">dump_context</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">x</span><span class="p">,</span> <span class="n">y</span><span class="p">):</span>
    <span class="k">print</span><span class="p">(</span><span class="s1">&#39;Executing task id {0.id}, args: {0.args!r} kwargs: {0.kwargs!r}&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
            <span class="bp">self</span><span class="o">.</span><span class="n">request</span><span class="p">))</span>
</pre></div>
</div>
<h2>Logging <span id="logging"></span></h2>

<p>worker 将会自动配置logging， 也可以手动配置定制logging 日志输出.</p>

<p>Celery 提供一个名为<code>celery.task</code>的logger供使用, 你可以通过这个logger 自动的生成一个名称和唯一id作为日志的一部分.</p>

<p>推荐在每个模块中都声明一个logger， 每个模块使用单独的logger.</p>
<div class="code-wrapper"><span class="lang-label">Python</span><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">celery.utils.log</span> <span class="kn">import</span> <span class="n">get_task_logger</span>

<span class="n">logger</span> <span class="o">=</span> <span class="n">get_task_logger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span>

<span class="nd">@app.task</span>
<span class="k">def</span> <span class="nf">add</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">y</span><span class="p">):</span>
    <span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Adding {0} + {1}&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">y</span><span class="p">))</span>
    <span class="k">return</span> <span class="n">x</span> <span class="o">+</span> <span class="n">y</span>
</pre></div>
</div>
<p>Celery 使用Python标准库的logging模块， 文档支持可以在<a href="http://docs.python.org/dev/library/logging.html#module-logging">logging</a> 模块中看到</p>

<p>你也可以使用<code>print()</code>, 任何写入标准输出和标准错误都会转到日志系统。 所以print的字符也会作为日志记录， 记录等级为<strong>WARN</strong>.</p>

<h2>Retrying <span id="retrying"></span></h2>

<p><code>retry()</code> 可以重试任务， 当任务出现可恢复的错误.</p>

<p>当调用<code>retry()</code>时将会发送一个新的消息， 使用相同的task-id,  确保消息和原始任务属于相同的队列.</p>

<p>当一个消息重试后， 任务也会记录一个状态。这样你可以使用结果实例跟踪任务状态记录(see <a href="#states">States</a>)</p>

<p>一个使用<code>retry()</code> 的例子:</p>
<div class="code-wrapper"><span class="lang-label">Python</span><div class="highlight"><pre><span></span><span class="nd">@app.task</span><span class="p">(</span><span class="n">bind</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">send_twitter_status</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">oauth</span><span class="p">,</span> <span class="n">tweet</span><span class="p">):</span>
    <span class="k">try</span><span class="p">:</span>
        <span class="n">twitter</span> <span class="o">=</span> <span class="n">Twitter</span><span class="p">(</span><span class="n">oauth</span><span class="p">)</span>
        <span class="n">twitter</span><span class="o">.</span><span class="n">update_status</span><span class="p">(</span><span class="n">tweet</span><span class="p">)</span>
    <span class="k">except</span> <span class="p">(</span><span class="n">Twitter</span><span class="o">.</span><span class="n">FailWhaleError</span><span class="p">,</span> <span class="n">Twitter</span><span class="o">.</span><span class="n">LoginError</span><span class="p">)</span> <span class="k">as</span> <span class="n">exc</span><span class="p">:</span>
        <span class="k">raise</span> <span class="bp">self</span><span class="o">.</span><span class="n">retry</span><span class="p">(</span><span class="n">exc</span><span class="o">=</span><span class="n">exc</span><span class="p">)</span>
</pre></div>
</div>
<p><code>bind</code> 参数告诉装饰器将会给一个可以访问的self实例.</p>

<p>当存储任务结果，exc 是用于传递异常信息用户日志输出。 内容有异常信息和traceback信息都存在于exc.</p>

<p>如果此任务有<code>max_retries</code>值， 并且重试次数超过了这个值， 那么这个exc异常将会重新raise.  如果是下列情况将不会这样:</p>

<ul>
<li>exc 没有指定</li>
</ul>

<p>这种情况下将会raise <code>MaxRetriesExceeded</code>异常, 这个是默认异常</p>

<ul>
<li>没有异常</li>
</ul>

<p>当重试没有异常发生(也就是上面except没有发生)， 重试次数达到了， 但task还没有正确返回， 可以指定给exc一个异常， 用于代理默认的<code>MaxRetriesExceeded</code>.</p>
<div class="code-wrapper"><span class="lang-label">Python</span><div class="highlight"><pre><span></span><span class="bp">self</span><span class="o">.</span><span class="n">retry</span><span class="p">(</span><span class="n">exc</span><span class="o">=</span><span class="n">Twitter</span><span class="o">.</span><span class="n">LoginError</span><span class="p">())</span>
</pre></div>
</div>
<p>将会触发提供的异常信息。 </p>

<p><strong>自定义重试间隔</strong></p>

<p>当一个任务要去重试， 可以指定一个时间之后再去重试. 使用<code>default_retry_delay</code>属性来设置默认延迟.默认是三分钟,  注意: 延迟的单位是秒.</p>

<p>你可以使用 <code>retry(..., countdown=60s)</code>来覆盖task级别的<code>default_retry_delay</code>时间. 两种方法灵活使用 </p>
<div class="code-wrapper"><span class="lang-label">Python</span><div class="highlight"><pre><span></span><span class="nd">@app.task</span><span class="p">(</span><span class="n">bind</span><span class="o">=</span><span class="bp">True</span><span class="p">,</span> <span class="n">default_retry_delay</span><span class="o">=</span><span class="mi">30</span> <span class="o">*</span> <span class="mi">60</span><span class="p">)</span>  <span class="c1"># retry in 30 minutes.</span>
<span class="k">def</span> <span class="nf">add</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">x</span><span class="p">,</span> <span class="n">y</span><span class="p">):</span>
    <span class="k">try</span><span class="p">:</span>
        <span class="err">…</span>
    <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">exc</span><span class="p">:</span>
        <span class="k">raise</span> <span class="bp">self</span><span class="o">.</span><span class="n">retry</span><span class="p">(</span><span class="n">exc</span><span class="o">=</span><span class="n">exc</span><span class="p">,</span> <span class="n">countdown</span><span class="o">=</span><span class="mi">60</span><span class="p">)</span>  <span class="c1"># override the default and                                                 # retry in 1 minute</span>
</pre></div>
</div>
<h2>List of Options <span id="listofoptions"></span></h2>

<p>Task.<strong>name</strong></p>

<p>task 注册名</p>

<p>可以手动设置，也可以生成此name. See: <a href="#names">Names</a></p>

<p>未完待续&gt;&gt;&gt;</p>

            </div>
            <div id="footer">
                <a href="../words.html"><div id="more-words">MORE WORDS</div></a>
            </div>
        </div>
    </body>
</html>